Skip to content

Add -n/--max-messages flag for debug mode#63

Merged
gmr merged 7 commits intomainfrom
feature/max-messages
Apr 3, 2026
Merged

Add -n/--max-messages flag for debug mode#63
gmr merged 7 commits intomainfrom
feature/max-messages

Conversation

@gmr
Copy link
Copy Markdown
Owner

@gmr gmr commented Apr 3, 2026

Summary

Add a -n N / --max-messages N CLI flag that causes each consumer process to shut down after processing N messages. When set, the MCP skips respawning exited processes, so the entire daemon exits once all consumers hit their limit.

Useful for debugging consumers against real queues without leaving them running indefinitely.

Usage: rejected -c config.yaml -o my_consumer -n 5

Closes #31

Changes

  • controller.py: Add -n/--max-messages arg, pass through to MCP
  • mcp.py: Accept max_messages, pass to Process kwargs, skip check_process_counts when set
  • process.py: Add max_messages property, check limit in on_processed and initiate graceful shutdown
  • tests/test_controller.py: Add max_messages to mock args fixture

Test plan

  • All 208 existing tests pass
  • Pre-commit hooks pass
  • Manual: rejected -c example.yaml -o consumer -n 3 processes 3 messages then exits

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added CLI option to limit the maximum number of messages processed per invocation; processing stops automatically when the limit is reached.
  • Documentation

    • Updated docs and examples to use top-level imports and the new docs site; renamed concurrent consumer from TransactionConsumer → FunctionalConsumer and updated message-type usage to use type.
  • Exports

    • Re-exported select consumer, exception, and context symbols at the package top level for simpler imports.
  • Tests

    • Updated tests and fixtures to include the new CLI argument and message-type changes.

Process N messages per consumer process then shut down. When set,
the MCP does not respawn exited processes, so the entire daemon
exits once all consumers have reached their limit.

Closes #31

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 3, 2026

Warning

Rate limit exceeded

@gmr has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 20 minutes and 23 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 20 minutes and 23 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 50e41477-2d88-4d86-8ed1-28be08e33823

📥 Commits

Reviewing files that changed from the base of the PR and between 65d3f62 and a367aad.

📒 Files selected for processing (4)
  • docs/consumer_howto.md
  • docs/index.md
  • pyproject.toml
  • rejected/controller.py
📝 Walkthrough

Walkthrough

Adds a CLI flag -n/--max-messages propagated from controller → MasterControlProgram → spawned Process to limit per-process consumed messages and trigger shutdown; renames TransactionConsumerFunctionalConsumer, replaces Message.message_type with Message.type, and updates exports, docs, and tests accordingly.

Changes

Cohort / File(s) Summary
Controller / MCP / Process (message limiting)
rejected/controller.py, rejected/mcp.py, rejected/process.py
Add -n/--max-messages CLI arg; MasterControlProgram.__init__ accepts max_messages and passes it into spawned Process kwargs; Process exposes max_messages property, increments _processed_count, and calls shutdown_connections() when limit reached.
Message model & accessors
rejected/models.py, rejected/consumer.py, rejected/testing.py, rejected/process.py
Rename Message.message_typeMessage.type; update consumer/property accessors and decoding to use msg.type (and corresponding test helpers).
Consumer rename & behavior docs
rejected/consumer.py, rejected/__init__.py
Rename TransactionConsumerFunctionalConsumer; update top-level exports in __init__.py to export FunctionalConsumer, ProcessingContext, and RejectedException while removing TransactionConsumer.
Tests
tests/test_controller.py, tests/test_consumer.py, tests/test_process.py
Update test helpers and fixtures to include max_messages in args and to construct models.Message with type= instead of message_type=; adjust assertions accordingly.
Docs / README / guides
README.md, CLAUDE.md, docs/*
Replace TransactionConsumer references with FunctionalConsumer; update examples/imports to import rejected and use rejected.* symbols; adjust docs for Message.type and other updated exports.

Sequence Diagram(s)

sequenceDiagram
    participant CLI
    participant Controller
    participant MCP as MasterControlProgram
    participant Process
    participant Broker as MessageBroker

    CLI->>Controller: start (--max-messages=N)
    Controller->>MCP: start(max_messages=N)
    MCP->>Process: spawn(kwargs{..., max_messages:N})
    Process->>Broker: consume message
    Broker-->>Process: deliver message
    Process->>Process: increment _processed_count
    alt _processed_count < max_messages
        Process->>Broker: ack / continue
    else _processed_count >= max_messages
        Process->>Process: log limit reached
        Process->>Process: shutdown_connections()
        Process->>MCP: exit/terminate
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

🐰 I count each hop, each tiny byte,
Nibbled flags guide my debug night.
When the Nth carrot rings the bell,
I tuck my ears — all is well.
Hoppity-hop, the consumer sleeps tight.

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Out of Scope Changes check ⚠️ Warning The PR includes extensive out-of-scope changes: renamed TransactionConsumer to FunctionalConsumer, changed Message.message_type to Message.type, updated all documentation and imports, and refactored public API exports, all unrelated to the max-messages feature. Separate the max-messages feature implementation from the API migration changes (Consumer renaming, Message field rename, export reorganization) into distinct PRs to maintain focused, reviewable changes.
Docstring Coverage ⚠️ Warning Docstring coverage is 45.83% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the primary feature: adding a CLI flag for limiting message processing, which is the core functional change in this PR.
Linked Issues check ✅ Passed The PR implements the debug mode requirement from issue #31: adds -n/--max-messages CLI flag, passes it through the MCP layer, and implements shutdown logic after N messages are processed in the Process class.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/max-messages

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
rejected/controller.py (1)

166-174: Consider validating max_messages to reject non-positive values.

The argument accepts any integer, including 0 or negative values. A value of 0 would be falsy and act as "no limit" (potentially confusing), while negative values would cause immediate shutdown after the first message since _processed_count >= self.max_messages would always be true.

Add validation in main()
     if args.quantity is not None and args.consumer is None:
         parser.error('--qty requires --only')
+    if args.max_messages is not None and args.max_messages < 1:
+        parser.error('--max-messages must be at least 1')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/controller.py` around lines 166 - 174, The CLI currently allows zero
or negative values for the --max-messages argument; add validation in main() (or
via a custom argparse type) to reject non-positive integers for max_messages:
ensure parser.add_argument for '--max-messages' either uses a custom validator
like positive_int or, after parsing, checks args.max_messages and exits/raises
an error if args.max_messages is not None and args.max_messages <= 0; reference
the parser.add_argument call and the main() function and the max_messages
variable so the check prevents 0/negative values that would confuse the
_processed_count logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@rejected/controller.py`:
- Around line 166-174: The CLI currently allows zero or negative values for the
--max-messages argument; add validation in main() (or via a custom argparse
type) to reject non-positive integers for max_messages: ensure
parser.add_argument for '--max-messages' either uses a custom validator like
positive_int or, after parsing, checks args.max_messages and exits/raises an
error if args.max_messages is not None and args.max_messages <= 0; reference the
parser.add_argument call and the main() function and the max_messages variable
so the check prevents 0/negative values that would confuse the _processed_count
logic.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9cef54fb-507f-44eb-bec8-49ee08aeba29

📥 Commits

Reviewing files that changed from the base of the PR and between c69633e and 12915f2.

📒 Files selected for processing (4)
  • rejected/controller.py
  • rejected/mcp.py
  • rejected/process.py
  • tests/test_controller.py

gmr and others added 5 commits April 3, 2026 17:02
Add RejectedException and ProcessingContext to rejected.__init__
so consumers can use `rejected.Consumer`, `rejected.ProcessingContext`,
etc. directly. Update README examples to use top-level imports,
remove stale Python version badge, update docs URL to GH Pages.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
All code examples now use `import rejected` and `rejected.Consumer`,
`rejected.TransactionConsumer`, `rejected.ProcessingContext`, etc.
instead of sub-module imports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The AMQP property is `type`, and pika uses `properties.type`.
Rename the Pydantic field to match, eliminating the awkward
`message_type` indirection. Consumer.message_type property is
unchanged for backward compatibility.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
docs/migrating.md (1)

319-332: ⚠️ Potential issue | 🟡 Minor

Add TransactionConsumerFunctionalConsumer to the removed APIs table.

The table documents removed/replaced APIs but doesn't mention the TransactionConsumer rename. Users searching this table for migration guidance won't find it.

📝 Suggested addition
 | 3.x API | 4.0 Replacement |
 |---------|----------------|
 | `consumer.SmartConsumer` | `rejected.Consumer` (auto-deserializes) |
 | `consumer.PublishingConsumer` | `rejected.Consumer` (always could publish) |
+| `consumer.TransactionConsumer` | `rejected.FunctionalConsumer` |
 | `self.io_loop` | `asyncio.get_event_loop()` |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/migrating.md` around lines 319 - 332, The removed APIs table is missing
the rename for TransactionConsumer; add a table row mapping
`TransactionConsumer` to `FunctionalConsumer` in the same table where other
mappings like `consumer.SmartConsumer` → `rejected.Consumer` appear so users can
find migration guidance for TransactionConsumer; ensure the new row follows the
existing pipe-delimited format and placement (near other consumer/type renames)
and uses backticks for the symbols `TransactionConsumer` and
`FunctionalConsumer`.
rejected/consumer.py (1)

726-758: ⚠️ Potential issue | 🟡 Minor

Add a deprecation alias for TransactionConsumer to maintain backward compatibility.

The rename from TransactionConsumer to FunctionalConsumer is a breaking change for users importing TransactionConsumer. Since the module already maintains backward compatibility with exception re-exports, add an alias before the exception section:

# Deprecated alias for backward compatibility
TransactionConsumer = FunctionalConsumer

Additionally, rename the internal method _process_transactional (line 753) to _process_concurrent or similar to align with the new class name.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rejected/consumer.py` around lines 726 - 758, Add a deprecated alias so
existing imports of TransactionConsumer keep working by assigning
TransactionConsumer = FunctionalConsumer (place this alias before the exception
re-exports as requested), and rename the internal method _process_transactional
to _process_concurrent in the FunctionalConsumer class; update all internal
references (e.g., the lambda passed in _run_consumer that currently calls
self._process_transactional) to call self._process_concurrent to keep behavior
intact.
🧹 Nitpick comments (1)
docs/migrating.md (1)

352-368: Add TransactionConsumerFunctionalConsumer to the migration checklist.

The checklist is comprehensive but should include the class rename for users of the concurrent consumer.

📝 Suggested addition
 6. Replace `SmartConsumer` / `PublishingConsumer` with `rejected.Consumer`
+6a. Replace `TransactionConsumer` with `rejected.FunctionalConsumer`
 7. Replace `tornado` imports with `asyncio` equivalents
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/migrating.md` around lines 352 - 368, Add an entry to the Quick
Migration Checklist noting the class rename from TransactionConsumer to
FunctionalConsumer so users of the concurrent consumer update references;
specifically mention replacing any uses/imports of TransactionConsumer with
FunctionalConsumer (and update docs/tests/configs accordingly), and place this
item near other consumer-related entries such as the existing notes about
SmartConsumer/PublishingConsumer → rejected.Consumer to keep related migration
steps together.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/migrating.md`:
- Around line 100-116: Update the migration note to state that
FunctionalConsumer is a rename of TransactionConsumer rather than a brand-new
class: change the heading and text to indicate "FunctionalConsumer (renamed from
TransactionConsumer)" and replace "4.0 introduces FunctionalConsumer" with
wording like "FunctionalConsumer replaces/renames TransactionConsumer from 3.x"
and add a short note instructing users that code referencing TransactionConsumer
should be updated to use FunctionalConsumer (keeping reference to
rejected.FunctionalConsumer and TransactionConsumer so readers can locate the
symbols).

In `@rejected/__init__.py`:
- Around line 34-40: The top-level export list removed TransactionConsumer which
will break users importing rejected.TransactionConsumer; restore compatibility
by re-introducing a top-level alias/export named TransactionConsumer that
references the original implementation (e.g., the class or function in its
defining module) and add a deprecation notice in its docstring or runtime
warning to guide users to the new import path, then run tests to ensure imports
still resolve (referencing symbol TransactionConsumer and __init__.py to locate
where to add the alias).

In `@rejected/models.py`:
- Line 145: The public Message constructor removed the message_type input which
breaks external callers; restore a compatibility shim by accepting a
message_type parameter in the Message initializer (or model signature) and
mapping it to the new type field when present (e.g., prefer message_type if
provided, otherwise use type), and emit a DeprecationWarning to signal it's
deprecated; update the Message class/constructor (and any factory functions) to
accept message_type: str | None and set self.type accordingly so external code
continues to work.

---

Outside diff comments:
In `@docs/migrating.md`:
- Around line 319-332: The removed APIs table is missing the rename for
TransactionConsumer; add a table row mapping `TransactionConsumer` to
`FunctionalConsumer` in the same table where other mappings like
`consumer.SmartConsumer` → `rejected.Consumer` appear so users can find
migration guidance for TransactionConsumer; ensure the new row follows the
existing pipe-delimited format and placement (near other consumer/type renames)
and uses backticks for the symbols `TransactionConsumer` and
`FunctionalConsumer`.

In `@rejected/consumer.py`:
- Around line 726-758: Add a deprecated alias so existing imports of
TransactionConsumer keep working by assigning TransactionConsumer =
FunctionalConsumer (place this alias before the exception re-exports as
requested), and rename the internal method _process_transactional to
_process_concurrent in the FunctionalConsumer class; update all internal
references (e.g., the lambda passed in _run_consumer that currently calls
self._process_transactional) to call self._process_concurrent to keep behavior
intact.

---

Nitpick comments:
In `@docs/migrating.md`:
- Around line 352-368: Add an entry to the Quick Migration Checklist noting the
class rename from TransactionConsumer to FunctionalConsumer so users of the
concurrent consumer update references; specifically mention replacing any
uses/imports of TransactionConsumer with FunctionalConsumer (and update
docs/tests/configs accordingly), and place this item near other consumer-related
entries such as the existing notes about SmartConsumer/PublishingConsumer →
rejected.Consumer to keep related migration steps together.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 89d8205b-a692-424d-b907-ea3c69605cc9

📥 Commits

Reviewing files that changed from the base of the PR and between 1b6a171 and 65d3f62.

📒 Files selected for processing (14)
  • CLAUDE.md
  • README.md
  • docs/api.md
  • docs/configuration.md
  • docs/consumer_howto.md
  • docs/index.md
  • docs/migrating.md
  • rejected/__init__.py
  • rejected/consumer.py
  • rejected/models.py
  • rejected/process.py
  • rejected/testing.py
  • tests/test_consumer.py
  • tests/test_process.py
✅ Files skipped from review due to trivial changes (3)
  • docs/index.md
  • docs/configuration.md
  • CLAUDE.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • rejected/process.py

- Update all code examples in index.md and consumer_howto.md to use
  `import rejected` / `rejected.Consumer` style
- Update pyproject.toml docs URL from readthedocs to GH Pages

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gmr gmr merged commit 2c08445 into main Apr 3, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add "debug" mode that limits messages consumed

1 participant